package cn.lagou.homework

import java.util.regex.{Matcher, Pattern}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object LogDemo {
  //正则表达式：符合视频格式的正则
  val ipPattern: Pattern =
    Pattern.compile("""(\S+) .+/(\S+\.mp4) .*""")
  //正则表达式
  val flowPattern: Pattern =
    Pattern.compile(""".+ \[(.+?) .+ (200|206|304) (\d+) .+""")

  def main(args: Array[String]): Unit = {
    // 1.创建SparkContext
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    // 解析数据
    val dataRDD: RDD[String] = sc.textFile("data/cdn.txt")

    // 1.计算独立IP数
    val results: RDD[(String, Int)] =
      dataRDD.map(x => (x.split("\\s+")(0), 1))
        .reduceByKey(_ + _)
        .sortBy(_._2, ascending = false, 1)
    println("--------独立IP数-------------")
    results.take(10).foreach(println)
    println(s"独立ip数：${results.count()}")

    //2.统计每个视频独立IP数
    //匹配正则，查找视频链接
    val videoRDD: RDD[((String, String), Int)] = dataRDD.map(line => {
      val matcherFlag: Matcher = ipPattern.matcher(line)
      if (matcherFlag.matches()) {
        ((matcherFlag.group(2), matcherFlag.group(1)), 1)
      } else {
        ((" ", " "), 0)
      }
    })

    // ((141081.mp4,125.116.211.162),1)
    //  计算每个视频的独立ip数
    val results2: RDD[(String, Int)] = videoRDD.filter { case ((video, ip), count) =>
      video != "" && ip != "" && count != ""
    }
      .map { case ((video, ip), _) => (video, ip) }
      .distinct()
      .map { case (video, _) => (video, 1) }
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false, 1)
    println("----------每个视频独立IP数-------------------")
    results2.foreach(println)

    //统计一天中每个小时的流量
    val flowRDD: RDD[(String, Long)] = dataRDD.map(line => {
      val matcherFlag: Matcher = flowPattern.matcher(line)
      if (matcherFlag.matches()) {
        (matcherFlag.group(1).split(":")(1),
          matcherFlag.group(3).toLong)
      }
      else {
        ("", 0L)
      }
    })
    println("----------每个小时的流量------------------")
    flowRDD.filter { case (hour, flow) => flow != 0 }
      .reduceByKey(_ + _, 1)
      .collectAsMap()
      .mapValues(_ / 1024 / 1024 / 1024)
      .toList
      .sortBy(_._1)
      .foreach { case (k, v) => println(s"${k}时 CDN流量${v}G") }

    sc.stop()
  }
}
